Skip to main content Skip to complementary content

Setting up Amazon RDS for PostgreSQL for CDC (Change data capture)

You can use the AWS master user account for the PostgreSQL database instance as the user for the Amazon RDS for PostgreSQL source endpoint. The master user account has the required roles that allow it to set up Change Data Capture (CDC).

If you prefer not to use the master user account, the account you wish to use must have the rds_superuser role and the rds_replication role. The rds_replication role grants permissions to manage logical slots and to stream data using logical slots.

To enable logical replication for an Amazon RDS PostgreSQL database instance:

  1. Create a new parameter group with the following configuration:
    1. Set the rds.logical_replication parameter in the database parameter group to 1.
    2. max_wal_senders – The number of tasks that can run in parallel.
    3. max_replication_slots – Every connection (i.e. task) creates a new slot; this parameter configures the amount of simultaneous connections that can be established to the database.
  2. Link the Amazon RDS instance to the parameter group.

Using an account without the "rds_superuser" role

If you are not using an account with the rds_superuser role, you need to create several objects to capture Data Definition Language (DDL) events. Create these objects in the selected account and then create a trigger in the master user account.

To do this:

  1. Choose a schema where you want the objects to be created. The default schema is public. Ensure that the schema exists and is accessible by the NoPriv account.
  2. Create the table attrep_ddl_audit by running the following command:

    create table <objects_schema>.attrep_ddl_audit
    (
    c_key    bigserial primary key,
    c_time   timestamp,    -- Informational
    c_user   varchar(64),  -- Informational: current_user
    c_txn    varchar(16),  -- Informational: current transaction
    c_tag    varchar(24),  -- Either 'CREATE TABLE' or 'ALTER TABLE' or 'DROP TABLE'
    c_oid    integer,      -- For future use - TG_OBJECTID
    c_name   varchar(64),  -- For future use - TG_OBJECTNAME
    c_schema varchar(64),  -- For future use - TG_SCHEMANAME. For now, holds the current_schema
    c_ddlqry  text         -- The DDL query associated with the current DDL event
    );
  3. Create the function attrep_intercept_ddl by running the following command:

    CREATE OR REPLACE FUNCTION <objects_schema>.attrep_intercept_ddl()
      RETURNS event_trigger
    LANGUAGE plpgsql
      AS $$
      declare _qry text;
    BEGIN
      if (tg_tag='CREATE TABLE' or tg_tag='ALTER TABLE' or tg_tag='DROP TABLE') then
             SELECT current_query() into _qry;
             insert into <objects_schema>.attrep_ddl_audit
             values
             (
             default,current_timestamp,current_user,cast(TXID_CURRENT()as varchar(16)),tg_tag,0,'',current_schema,_qry
             );
             delete from <objects_schema>.attrep_ddl_audit;
    end if;
    END;
    $$;
    
  4. If you are logged in with a NoPriv account, log out of the NoPriv account and log in with an account that has the rds_superuser role assigned to it.

    Information note

    If the attrep_intercept_ddl stored procedure is not being created in the default schema, you need to specify the schema name in the Create DDL artifacts in schema field.

    For more information on the replication configuration parameters, see the PostgreSQL Help.

  5. Create the event trigger attrep_intercept_ddl by running the following command:

    CREATE EVENT TRIGGER attrep_intercept_ddl ON ddl_command_end

    EXECUTE PROCEDURE <objects_schema>.attrep_intercept_ddl();

  6. Grant the following permissions to the NoPriv account:

    • GRANT INSERT ON attrep_ddl_audit TO <NoPriv>;
    • GRANT DELETE ON attrep_ddl_audit TO <NoPriv>;
    • GRANT USAGE ON attrep_ddl_audit_c_key_seq TO <NoPriv>;
    • GRANT rds_replication TO <NoPriv>;
  7. If the WAL heartbeat option is selected in the endpoint's Advanced tab, you need to perform steps 7-8 as well.

    Create the heartbeat table:

    • create table <schema>.attrep_heartbeat (
    • hb_key serial primary key,
    • hb_created_at timestamp, --Informational
    • hb_created_by varchar(64), --Informational
    • hb_last_heartbeat_at timestamp,
    • hb_last_heartbeat_by varchar(64));
  8. Grant usage on the schema to the user:

    GRANT USAGE ON SCHEMA <schema> TO <non-Priv>;

Did this page help you?

If you find any issues with this page or its content – a typo, a missing step, or a technical error – let us know how we can improve!